package com.xxx.flink.customsource;

import com.xxx.flink.pojo.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @author guwq
 * @since 2023/7/25
 */
public class CustomSource implements SourceFunction<Event> {

    private boolean running = true;

    @Override
    public void run(SourceContext sourceContext) throws Exception {
        Random random = new Random();
        String [] users = {"Bob", "Lily", "Mary", "Katty"};
        String [] urls = {"./home", "/me", "/product", "/cart"};

        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            sourceContext.collect(new Event(user, url, timestamp));
            Thread.sleep(1000L);
        }

    }

    @Override
    public void cancel() {
        running = false;
    }
}
